use crate::run_plan::{HandResult, WindowResult};
use crate::data_frame::{value::{RealValue, ElementValue}};
use std::collections::hash_map::DefaultHasher;
use crate::data_frame::single_data::SingleData;
use std::hash::Hasher;
use num_traits::cast::AsPrimitive;
use async_recursion::async_recursion;
use crate::run_plan::accumulator::{logical_plan_accumulator_comb_to_run_plan, RunPlanExprAccumulatorComb};
use crate::physical_plan::PhysicalPlanExprArg;
use crate::run_plan::func::RunPlanExprFunc;
use crate::physical_plan::window::{PartitionHasher, Window};
use crate::data_frame::window::WindowElementDataInstance;
use crate::logical_plan::expr::window::LogicalPlanExprWindow;
use crate::conf_init::MapResult;
use crate::logical_plan::expr::partition_by::partition_by_map;
use crate::logical_plan::expr::func::logical_plan_expr_func_map_to_run_plan_expr_func;

#[derive(Debug)]
pub enum RunPlanExprPartitionType {
    Order(Vec<Box<RunPlanExprFunc>>),
    DisOrder(Vec<Box<RunPlanExprFunc>>),
}

#[async_trait]
impl PartitionHasher<SingleData> for RunPlanExprPartitionType{

    async fn hash_value(&mut self, mut d: SingleData)->usize{

        match self{
            RunPlanExprPartitionType::Order(order) => {
                let mut hasher = DefaultHasher::new();
                for t in order.iter_mut(){
                    if let HandResult::Ok(v) = t.evaluate_single(&mut d).await{
                        match v{
                            RealValue::U8(v) => {hasher.write_u8(v);},
                            RealValue::U16(v) => {hasher.write_u16(v);},
                            RealValue::U32(v) => {hasher.write_u32(v);},
                            RealValue::U64(v) => {hasher.write_u64(v);},
                            RealValue::I8(v) => {hasher.write_i8(v);},
                            RealValue::I16(v) => {hasher.write_i16(v);},
                            RealValue::I32(v) => {hasher.write_i32(v);},
                            RealValue::I64(v) => {hasher.write_i64(v);},
                            RealValue::F32(_v) => {},
                            RealValue::F64(_v) => {},
                            RealValue::None => {},
                            RealValue::Utf8(v) => {hasher.write_str(v.as_str());},
                            RealValue::Boolean(v) => {hasher.write_u8(v.as_());},
                            RealValue::Range(v1,v2) => {hasher.write_u64(v1);hasher.write_u64(v2);},
                            RealValue::DateTime(_v) => {
                                /*
                                    DateTime不作为hasher因素的原因就是几乎都会分成不同的组，暂时没看到意义。
                                */
                            },
                            RealValue::Duration(_v) => {
                                /*
                                    间隔多少纳秒，也先不作为hasher因素吧，感觉也没有啥意义。
                                */
                            },
                            RealValue::Bytes(v) => {
                                for v in v.iter(){
                                    hasher.write_u8(*v);
                                }
                            },
                        }
                    }
                }
                hasher.finish() as usize
            },
            RunPlanExprPartitionType::DisOrder(disorder) => {
                let mut hash = 0;
                for t in disorder.iter_mut(){
                    if let HandResult::Ok(v) = t.evaluate_single(&mut d).await{
                        match v{
                            RealValue::U8(v) => {
                                let mut hasher = DefaultHasher::new();
                                hasher.write_u8(v);
                                hash = hash ^ hasher.finish() as usize;
                            },
                            RealValue::U16(v) => {
                                let mut hasher = DefaultHasher::new();
                                hasher.write_u16(v);
                                hash = hash ^ hasher.finish() as usize;
                            },
                            RealValue::U32(v) => {
                                let mut hasher = DefaultHasher::new();
                                hasher.write_u32(v);
                                hash = hash ^ hasher.finish() as usize;
                            },
                            RealValue::U64(v) => {
                                let mut hasher = DefaultHasher::new();
                                hasher.write_u64(v);
                                hash = hash ^ hasher.finish() as usize;
                            },
                            RealValue::I8(v) => {
                                let mut hasher = DefaultHasher::new();
                                hasher.write_i8(v);
                                hash = hash ^ hasher.finish() as usize;
                            },
                            RealValue::I16(v) => {
                                let mut hasher = DefaultHasher::new();
                                hasher.write_i16(v);
                                hash = hash ^ hasher.finish() as usize;
                            },
                            RealValue::I32(v) => {
                                let mut hasher = DefaultHasher::new();
                                hasher.write_i32(v);
                                hash = hash ^ hasher.finish() as usize;
                            },
                            RealValue::I64(v) => {
                                let mut hasher = DefaultHasher::new();
                                hasher.write_i64(v);
                                hash = hash ^ hasher.finish() as usize;
                            },
                            RealValue::F32(_v) => {},
                            RealValue::F64(_v) => {},
                            RealValue::None => {},
                            RealValue::Utf8(v) => {
                                let mut hasher = DefaultHasher::new();
                                hasher.write_str(v.as_str());
                                hash = hash ^ hasher.finish() as usize;
                            },
                            RealValue::Boolean(v) => {
                                let mut hasher = DefaultHasher::new();
                                hasher.write_u8(v.as_());
                                hash = hash ^ hasher.finish() as usize;
                            },
                            RealValue::Range(v1,v2) => {
                                let mut hasher = DefaultHasher::new();
                                hasher.write_u64(v1);hasher.write_u64(v2);
                                hash = hash ^ hasher.finish() as usize;
                            },
                            RealValue::DateTime(_v) => {
                                /*
                                    DateTime不作为hasher因素的原因就是几乎都会分成不同的组，暂时没看到意义。
                                */
                            },
                            RealValue::Duration(_v) => {
                                /*
                                    间隔多少纳秒，也先不作为hasher因素吧，感觉也没有啥意义。
                                */
                            },
                            RealValue::Bytes(v) => {
                                let mut hasher = DefaultHasher::new();
                                for v in v.iter(){
                                    hasher.write_u8(*v);
                                }
                                hash = hash ^ hasher.finish() as usize;
                            },
                        }
                    }
                }
                hash
            },
        }
    }
}

pub struct RunPlanExprWindow {
    window_name: String,
    //partition_by: Option<RunPlanExprPartitionType>,
    sequential: Box<RunPlanExprFunc>,
    uniqueness: Box<RunPlanExprFunc>,
    window: Window<ElementValue,RealValue,WindowElementDataInstance,RunPlanExprPartitionType,RunPlanExprAccumulatorComb<SingleData,Box<PhysicalPlanExprArg>>>,
}

pub fn logical_plan_window_to_run_plan(window: &LogicalPlanExprWindow)
    ->MapResult<Box<RunPlanExprWindow>>{

    let partition_by = partition_by_map(&window.partition_by)?;
    let accumulator = logical_plan_accumulator_comb_to_run_plan(&window.accumulator)?;
    let sequential = logical_plan_expr_func_map_to_run_plan_expr_func(&window.sequential)?;
    let uniqueness = logical_plan_expr_func_map_to_run_plan_expr_func(&window.uniqueness)?;

    let window_name = window.name.clone();
    let window = Window::build_new(window.name.clone(),
                                   window.window_time,
                                   window.slide_time,
                                   window.wait_time,
                                   window.time_unit.clone(),
                                   partition_by,
                                   *accumulator, true)?;

    let window_run = RunPlanExprWindow {
        window_name: window_name,
        //partition_by: partition_by,
        sequential: sequential,
        uniqueness: uniqueness,
        window: window,
    };

    return MapResult::Ok(Box::new(window_run));
}

impl RunPlanExprWindow{
    #[async_recursion]
    pub async fn evaluate(&mut self, mut data: SingleData)->WindowResult{
        let sequential = self.sequential.evaluate_single(&mut data).await?;
        let uniqueness = self.uniqueness.evaluate_single(&mut data).await?;

        if let RealValue::U64(sequential) = sequential{
            if let RealValue::U64(uniqueness) = uniqueness{
                let data_window_index = WindowElementDataInstance::new(sequential,uniqueness,data);
                return WindowResult::Ok(self.window.insert(data_window_index).await);
            }else{
                return WindowResult::Err(());
            }
        }else{
            return WindowResult::Err(());
        }
    }
}



